package com.uxsino.reactorq.commons;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;

/**
 * a {@link Flux} dispatches messsage from JMS Queue. see {@link QueueConnection}
 * 
 *
 * @param <T>
 */
public final class QueueFlux<T> extends JMSFlux<T> {
    private static Logger logger = LoggerFactory.getLogger(QueueFlux.class);

    /**
     * create a queue flux
     * @param connection the connection opened for this flux
     * @param queueName queue name
     * @param cls the type of object to receive
     * @throws JMSException
     */
    public QueueFlux(QueueConnection connection, String queueName, Class<? extends T> cls) throws JMSException {
        this(connection, queueName, cls, null, false);
    }

    /**
     * create a queue flux
     * @param connection the connection opened for this flux
     * @param queueName queue name
     * @param cls the type of object to receive
     * @param acknowledge acknowledge the message
     * @throws JMSException
     */
    public QueueFlux(QueueConnection connection, String queueName, Class<? extends T> cls, boolean acknowledge)
        throws JMSException {
        this(connection, queueName, cls, null, acknowledge);
    }

    /**
     * create a queue flux
     * @param connection the connection opened for this flux
     * @param queueName queue name
     * @param cls the type of object to receive
     * @param receiverId the id of receiver
     * @param acknowledge acknowledge the message
     * @throws JMSException
     */
    public QueueFlux(QueueConnection connection, String queueName, Class<? extends T> cls, String receiverId,
        boolean acknowledge) throws JMSException {
        super(connection, queueName, cls, receiverId, acknowledge);
    }

    /**
     * create a queue flux
     * @param connection the connection opened for this flux
     * @param queueName queue name
     * @param cls the type of object to receive
     * @param receiverId the id of receiver
     * @throws JMSException
     */
    public QueueFlux(QueueConnection connection, String queueName, Class<? extends T> cls, String receiverId)
        throws JMSException {
        super(connection, queueName, cls, receiverId);
    }

    /**
     * get the {@link Queue} internally used
     * @return the queue
     */
    public Queue getQueue() {
        return (Queue) destination;
    }

    /**
     * get the name of internal queue
     * @return the name
     */
    public String getQueueName() {
        return destinationName;
    }

    public void dispose() {
        try {
            if (session != null) {
                session.close();
            }
        } catch (JMSException e) {
        }
    }

    @Override
    protected Session openSession(Connection conn) throws JMSException {
        if (acknowledge) {
            logger.info("设置Queue接收端消息确认...");
            // 如果接收端不确认消息，那么activemq将会把这条消息一直保留，直到有一个接收端确定了消息。
            return ((QueueConnection) connection).createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
        } else {
            logger.info("设置Queue消息自动确认...");
            return ((QueueConnection) connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        }

    }

    @Override
    protected Destination openDestination(Session session) throws JMSException {
        if (null != destinationName) {
            return ((QueueSession) session).createQueue(destinationName);
        } else {
            return session.createTemporaryQueue();
        }
    }

    @Override
    protected MessageConsumer openMessageConsumer(Session session, String selector) throws JMSException {
        if (selector != null) {
            return ((QueueSession) session).createReceiver((Queue) destination, selector);
        } else {
            return ((QueueSession) session).createReceiver((Queue) destination);
        }
    }

}